package org.luosl.webmagicx.pipeline

import java.io._
import java.util.concurrent.locks.{Lock, ReentrantLock}

import org.luosl.webmagicx.conf.{Props, SpiderConf}
import us.codecraft.webmagic.{ResultItems, Task}
import org.apache.commons.csv.{CSVFormat, CSVParser, CSVPrinter}
import org.eclipse.jetty.util.ConcurrentHashSet
import org.luosl.webmagicx.pipeline.component.{Distinct, HashSetDistinct}

import scala.collection.JavaConverters._


/**
  * Created by luosl on 2017/12/6.
  */
class CSVPipeline(sc:SpiderConf, props:Props) extends BasePipeline(sc, props){

  private val file:File = new File(props.prop("path"))

  private val charset:String = props.prop("charset", "utf-8")

  val header:List[String] = {
    val needSaveFields:String = props.prop("needSaveFields", "*").trim
    if(needSaveFields == "*"){
      ("_url" :: sc.fields.map(_.name).toList).sorted
    }else{
      needSaveFields.split(",|，").map(_.trim).toList
    }
  }

  private val format:CSVFormat = CSVFormat.DEFAULT.withHeader(header:_*).withSkipHeaderRecord()


  private val distinctOpt:Option[Distinct] = {
    props.propOption("distinctField").map{ disField=>
      // 加载之前的缓存
      val loadCacheOp:ConcurrentHashSet[Any]=>Unit = (set:ConcurrentHashSet[Any])=> {
        if(file.exists()){
          val reader = new InputStreamReader(new FileInputStream(file),charset)
          try {
            val itr:CSVParser = format.parse(reader)
            itr.iterator().asScala.foreach{ rec=>
              set.add(rec.get(disField))
            }
          }catch {
            case ie:IllegalArgumentException =>
              throw new RuntimeException(s"无效的[distinctField=$disField],请检查配置文件:${sc.path}")
            case e:Exception =>
              throw e
          }finally {
            reader.close()
          }
        }
      }
      val distValOp:ResultItems => Any = (ris:ResultItems) => ris.get(disField).asInstanceOf[Any]
      new HashSetDistinct(loadCacheOp,distValOp)
    }
  }

  val csvPrinter:CSVPrinter = {
    val append = props.prop("model") match {
      case "append" => true
      case "override" => false
      case other:Any =>
        val msg:String = s"无效的Model:$other,CSVPipeline的[model]属性只支持[append,override],请检查配置文件:${sc.path}"
        throw new RuntimeException(msg)
    }
    val writer:OutputStreamWriter = new OutputStreamWriter(new FileOutputStream(file, append), charset)
    new CSVPrinter(writer, format)
  }

  val lock:Lock = new ReentrantLock()

  override def process(resultItems: ResultItems, task: Task): Unit = {

    def save(): Unit ={
      try {
        val item:Array[String] = header.map(key=> resultItems.get[String](key)).toArray
        csvPrinter.printRecord(item:_*)
        logInfo(s"CSVPipeline:保存[url=${resultItems.get("_url")}]成功.....")
      }catch {
        case e:Exception =>
          logError("csvPipeline保存失败!",e)
          onError(resultItems, task)
      }
    }

    lock.lock()
    try {
      if(distinctOpt.isDefined){
        if(distinctOpt.get.isUniqueAndAdd(resultItems)){
          save()
          onSuccess(resultItems, task)
        }else{
          onSkip(resultItems, task)
        }
      }else{
        save()
        onSuccess(resultItems, task)
      }
    }finally lock.unlock()

  }

  override def close(): Unit ={
    if(props.prop("closeAtTheEnd","true").toBoolean) {
      csvPrinter.close()
    }else{
      csvPrinter.flush()
    }
  }

}
